DartVM MessageHandler
介绍
每个 Isolate 都有一个 message handler,处理消息对列上所有的传入消息。
该类位于 runtime/vm/message_handler.h。
类继承关系
MessageHandler 基类是一个抽象类,有多种实现类,具体关系如下:
三个类分别是:
- MessageHandler
- IsolateMessageHandler
- NativeMessageHandler
MessageHandler 基类
成员
两条消息队列
消息队列位于 MessageHandler 内,作为成员,有两个:
MessageQueue* queue_;
MessageQueue* oob_queue_;
其中:
- quque_ 是消息队列
- oob_queue_ 是控制消息队列
线程池
MessageHandler 还持有了对线程池的引用:
ThreadPool* pool_;
声明周期回调
起停生命周期还有回调:
StartCallback start_callback_;
EndCallback end_callback_;
CallbackData callback_data_;
公有方法
Run
// 在线程池上运行 message handler
// 在处理消息之前,会先调用 StartFunction 回调
// message handler 开始运行直到结束
// 如果消息处理过程中遇到问题,message handler 会提前退出
void Run(ThreadPool* pool,
StartCallback start_callback,
EndCallback end_callback,
CallbackData data);
handleNextMessage
有两个:
// 处理下一条消息
// 只能在 handler 没有在线程池上运行的时候调用
// 处理成功返回 true
MessageStatus HandleNextMessage();
// 处理下一条 OOB 处理下一条消息
// 只能在 handler 没有在线程池上运行的时候调用
// 处理成功返回 true
MessageStatus HandleOOBMessages();
暂停
MessageHandler 消息处理还支持暂停一段时间:
// 根据条件变量阻塞线程,直到有新消息到来,然后处理所有消息
MessageStatus PauseAndHandleAllMessages(int64_t timeout_millis);
消息类型查找
队列中是否存在某种类型(普通消息和 OOB 消息)的消息:
bool HasOOBMessages();
bool HasMessages();
活跃端口
MessageHandler 与端口有所关联:
// A message handler tracks how many live ports it has.
bool HasLivePorts() const { return live_ports_ > 0; }
intptr_t live_ports() const { return live_ports_; }
暂停计数
MessageHandler 对于暂停状态的维护,采用了引用计数的方式:
bool paused() const { return paused_ > 0; }
void increment_paused() { paused_++; }
void decrement_paused() {
ASSERT(paused_ > 0);
paused_--;
}
保护方法
发送消息
通过 PostMessage,可以向消息队列中发送一条消息:
// 向当前 handler 的消息队列发送消息
// 如果 before_events 为 true,将消息插到队头
void PostMessage(std::unique_ptr<Message> message,
bool before_events = false);
消息处理
消息处理方法,抽象方法,由子类给出实现。MessageHandler 对消息的解析处理,在该方法中进行:
// Handles a single message. Provided by subclass.
// Returns true on success.
virtual MessageStatus HandleMessage(std::unique_ptr<Message> message) = 0;
私有方法
消息取出
从消息队列中取出一条消息,OOB 消息的优先级高于普通消息:
std::unique_ptr<Message> DequeueMessage(Message::Priority min_priority);
清理 OOB 栈
OOB 栈是允许清理的:
void ClearOOBQueue();
构造场景
通过构造能够看出该类的使用场景,分别寻找这3个类的构造场景。
Isolate setter/getter
在 Isolate 的 C/C++ 层实现中,可以通过 setter/getter 设置 message handler:
MessageHandler* message_handler() const { return message_handler_; };
void set_message_handler(MessageHandler* value) { message_handler_ = value; }
MessageHandler* message_handler_ = nullptr;
Dart_NewNativePort
位于 runtime/vm/native_api_impl.cc:
DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
Dart_NativeMessageHandler handler,
bool handle_concurrently) {
// ……
// Start the native port without a current isolate.
IsolateLeaveScope saver(Isolate::Current());
NativeMessageHandler* nmh = new NativeMessageHandler(name, handler);
Dart_Port port_id = PortMap::CreatePort(nmh);
PortMap::SetPortState(port_id, PortMap::kLivePort);
nmh->Run(Dart::thread_pool(), NULL, NULL, 0);
return port_id;
}
PortMap::CreatePort
messageHandler 跟 Port 也有很深的关联:
Dart_Port PortMap::CreatePort(MessageHandler* handler) {
// ……
const Dart_Port port = AllocatePort();
// The MessageHandler::ports_ is only accessed by [PortMap], it is guarded
// by the [PortMap::mutex_] we already hold.
MessageHandler::PortSetEntry isolate_entry;
isolate_entry.port = port;
handler->ports_.Insert(isolate_entry);
Entry entry;
entry.port = port;
entry.handler = handler;
entry.state = kNewPort;
ports_->Insert(entry);
// ……
return entry.port;
}
Run 运行消息处理
消息队列只是一个队列,真正在线程池上运行的是消息处理器(MessageHandler),来消费掉这个队列。
Run 方法的作用是,在线程上启动消息处理器(被封装成 MessageHandlerTask):
void MessageHandler::Run(ThreadPool* pool,
StartCallback start_callback,
EndCallback end_callback,
CallbackData data) {
MonitorLocker ml(&monitor_);
// ......
pool_ = pool;
start_callback_ = start_callback;
end_callback_ = end_callback;
callback_data_ = data;
task_running_ = true;
// 在线程池上运行任务
const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
ASSERT(launched_successfully);
}
调用处,都在哪里进行这个启动操作呢?
- runtime/vm/kernel_isolate.cc 的 RunKernelTask 里,启动了 isolate 的 messageHanlder
- KernelIsolate::Start
- runtime/vm/native_api_impl.cc 的 Dart_NewNativePort 中,启动了 NativeMessageHandler 的 messageHandler
postMessage 发消息
通过 Port 机制,向消息循环发送消息,是通过该方法实现的。
void MessageHandler::PostMessage(std::unique_ptr<Message> message,
bool before_events) {
Message::Priority saved_priority;
{
MonitorLocker ml(&monitor_);
if (FLAG_trace_isolates) {
Isolate* source_isolate = Isolate::Current();
// ......
}
saved_priority = message->priority();
// 消息入队
if (message->IsOOB()) {
oob_queue_->Enqueue(std::move(message), before_events);
} else {
queue_->Enqueue(std::move(message), before_events);
}
if (paused_for_messages_) {
ml.Notify();
}
// 如果消息队列没有在线程上运行,那么在线程池上运行当前任务队列
if (pool_ != nullptr && !task_running_) {
task_running_ = true;
const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
}
}
// Invoke any custom message notification.
// 最后还有一个通知操作
MessageNotify(saved_priority);
}
MessageNotify 消息通知
在基类中只给了一个空实现。在派生类中是怎么使用这个方法的?
在 IsolateMessageHandler 的 MessageNotify 中,加入了额外处理逻辑。可以看到,有一个 OOB 类型消息优先响应,以及触发 embedder 的回调。